Skip to content

Client rack support#1159

Merged
ods merged 26 commits intoaio-libs:masterfrom
exness:master
Apr 23, 2026
Merged

Client rack support#1159
ods merged 26 commits intoaio-libs:masterfrom
exness:master

Conversation

@GlebShipilov
Copy link
Copy Markdown
Contributor

@GlebShipilov GlebShipilov commented Apr 21, 2026

Add support for KIP-392 (fetch from closest replica) so consumers can read from a same-rack follower instead of the partition leader, eliminating cross-AZ / cross-region traffic when the cluster is configured with a rack-aware replica.selector.class.

  • AIOKafkaConsumer: new client_rack kwarg (default "" keeps current behaviour). When set, the rack id is sent in FetchRequest v11+ and the broker may respond with a preferred read replica.
  • Protocol: added FetchRequest/FetchResponse versions 5–11. v11 carries the rack_id field on requests and preferred_read_replica on responses; intermediate versions add LSO, leader epoch, and incremental fetch session fields needed to get there.
  • Fetcher: new per-partition cache tp -> (preferred_replica_node_id, expires_at) with TTL bound to metadata_max_age_ms. _select_read_replica honours the cache when valid, otherwise falls back to the leader. Cache is invalidated on NotLeaderForPartition, UnknownTopicOrPartition, OffsetOutOfRange, or when the chosen broker disappears from cluster metadata.
  • Correct handling of preferred_read_replica == -1: the meaning depends on the responder (per KIP-392). -1 from the leader means "no preferred replica, read from me" → drop the cache. -1 from the previously selected follower means "I am still the right one, keep using me" → keep the cache. Without this distinction the consumer oscillated between leader and follower on every fetch and rack-aware routing produced no measurable benefit.
  • Added documentation and unit tests covering the cache TTL, both -1 cases, error-driven invalidation, fall-through to the leader when no live preferred replica is known, and back-compat with brokers that only speak < v11.

Verified against a 3-rack test Kafka cluster: 100% of records were served from same-rack brokers, with the expected mix of leader and follower fetches per partition.

Checklist

  • I think the code is well written
  • Unit tests for the changes exist
  • Documentation reflects the changes
  • Add a new news fragment into the CHANGES folder (1159.feature)

GlebShipilov and others added 10 commits April 20, 2026 20:44
Adds a 'client_rack' option to AIOKafkaConsumer. When set and the broker
supports FetchRequest v11+ (Kafka 2.4+) with a configured
replica.selector.class such as RackAwareReplicaSelector, the consumer
fetches from the closest in-sync replica instead of the leader, reducing
cross-AZ traffic and tail latency.

Implementation:
* Enable FetchRequest v5..v11 in the protocol layer with a per-version
  builder that fills the new fields (log_start_offset, current_leader_epoch,
  session_id/epoch, forgotten_topics_data, rack_id) appropriately.
* Maintain a per-partition preferred-read-replica cache in Fetcher,
  populated from FetchResponse v11 and invalidated on TTL expiry, on
  -1 from the broker, when the node leaves cluster metadata, or on
  NotLeaderForPartition / UnknownTopicOrPartition / OffsetOutOfRange.
* Route fetches through the cache via a new _select_read_replica helper
  instead of always using the partition leader.
* Documentation in docs/consumer.rst including required broker-side
  configuration (broker.rack and replica.selector.class).
* 12 broker-less unit tests covering both protocol and runtime cache
  behavior.

Upstream PR: aio-libs#1158
[DI-5741] Add rack awareness to aiokafka
@ods
Copy link
Copy Markdown
Collaborator

ods commented Apr 21, 2026

@vmaurin, could you please take a look too?

Comment thread tests/test_rack_awareness.py Fixed
Comment thread aiokafka/consumer/consumer.py Outdated
Comment thread aiokafka/consumer/fetcher.py Outdated
Comment thread aiokafka/protocol/fetch.py Outdated
Comment thread aiokafka/protocol/fetch.py
Copy link
Copy Markdown
Contributor

@vmaurin vmaurin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • Implementation of the KIP-392 sounds correct (added few comments)
  • v11 fetch request/response is not using flexible version yet, so we should be fine here
  • I am not so sure about the implication of KIP-320 (v9) and incremental fetch support (v7), but it sounds we pass default value there, so maybe we should just be sure tests are working well with brokers using these versions of the protocol (probably already in the list)

partitions_by_topic.append((topic, new_parts))

args: list = [
-1, # replica_id
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am a bit unsure if we should still send -1 when consuming from a follower, but it seems to be valid when I look to the java client code https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java#L317

I am just a bit confused by this sentence in the KIP

the FetchRequest schema has field for the replica id. Consumers typically use the sentinel -1, which indicates that fetching is only allowed from the leader. A lesser known sentinel is -2, which was originally intended to be used for debugging and allows fetching from followers. We propose to let the consumer use this to indicate the intent to allow fetching from a follower.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, the Java client does the same. -1 means "I'm a consumer" not "only fetch from leader".

Comment thread aiokafka/protocol/fetch.py Outdated
args.append(self._rack_id)
return request_struct_class(*args)

if self._isolation_level:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we raise a similar exception is someone is setting a rack_id, while the broker doesn't support v11 ?

  • yes, you see the error and you can maybe fix your consumer config / update the broker. On the other side, your consumer keeps crashing
  • no, the consumer "works", but it will never consume from a follower in the same rack

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, silent fallback is what Java does. But worth a log warning.

Comment thread aiokafka/protocol/fetch.py
Comment thread aiokafka/consumer/fetcher.py Outdated
Comment thread aiokafka/consumer/fetcher.py Outdated
Comment thread aiokafka/consumer/fetcher.py Outdated
Review changes:
- Default `client_rack` to `None` instead of `""` in Consumer, Fetcher,
  and FetchRequest so the wire encoding sends null (-1) rather than an
  empty string (0) when rack awareness is disabled, matching the Java client.
- Remove unused `rack_id` property from FetchRequest.
- Remove `metadata_max_age_ms` param from Fetcher; read the TTL for the
  preferred-replica cache directly from `client._metadata_max_age_ms`.
  This also removes `self._metadata_max_age_ms` from AIOKafkaConsumer.
- Rename `node_id` to `preferred_read_replica` in
  `_update_preferred_read_replica` for readability.
- Make `responder_node_id` a required keyword argument (no default);
  remove the conservative `None` fallback branch and its test since
  the caller always knows who responded.
- Rename local `parts`/`new_parts` to `partitions`/`new_partitions`
  in FetchRequest.build().

CI fixes:
- Add explicit type annotations (`list[tuple[int, ...]]`,
  `list[object]`) in FetchRequest.build() to satisfy mypy.
…er only

  supports FetchRequest < v11 (rack-aware fetching silently falls back
  to the partition leader).
@codecov
Copy link
Copy Markdown

codecov Bot commented Apr 21, 2026

Codecov Report

❌ Patch coverage is 99.37304% with 2 lines in your changes missing coverage. Please review.
✅ Project coverage is 94.95%. Comparing base (870a52f) to head (f6175b6).
⚠️ Report is 1 commits behind head on master.

Files with missing lines Patch % Lines
aiokafka/consumer/fetcher.py 96.22% 1 Missing and 1 partial ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##           master    #1159      +/-   ##
==========================================
+ Coverage   94.87%   94.95%   +0.08%     
==========================================
  Files          88       89       +1     
  Lines       15743    16041     +298     
  Branches     1380     1397      +17     
==========================================
+ Hits        14936    15232     +296     
  Misses        558      558              
- Partials      249      251       +2     
Flag Coverage Δ
cext 94.91% <99.37%> (+0.08%) ⬆️
integration 94.83% <99.37%> (+0.07%) ⬆️
purepy 94.91% <99.37%> (+0.08%) ⬆️
unit 52.33% <93.73%> (+0.91%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

… test flakiness

- Revert rack_id parameter from `str | None = None` back to `str = ""`,
  matching the Kafka protocol definition (STRING, not NULLABLE_STRING).
- Remove `or ""` fallback in FetchRequest.build_request() since rack_id
  is now always a string.
- Add `or ""` at the call site in fetcher.py to convert None client_rack
  to empty string.
- Replace time.sleep-based TTL expiry test with direct cache manipulation
  to fix flakiness on Windows (coarse timer resolution).
- Add tests for rack version warning logging:
  - test_warning_logged_when_broker_below_v11: verifies warning is logged
    once when client_rack is set but broker doesn't support FetchRequest v11.
  - test_no_warning_when_rack_not_set: verifies no warning when client_rack
    is None.
- Evict cached preferred replica on fetch transport failure so the next
  attempt falls back to the leader instead of waiting up to
  metadata_max_age_ms (P1).
- Route ListOffsets / offset-reset traffic to the partition leader, not
  the preferred follower; keep rack-aware routing scoped to Fetch as
  KIP-392 requires (P2).
- Treat an unknown leader during reset routing as stale metadata: skip
  the partition and signal invalid_metadata so the client forces a
  refresh, instead of enqueuing under None/-1 (P3).

Add 4 new tests covering all three paths.
fix formatting
fix(protocol): default FetchRequest rack_id to "" to fix v11+ serialization against Kafka brokers
Comment thread aiokafka/consumer/fetcher.py Outdated
Comment on lines +722 to +729
if preferred_read_replica is None or preferred_read_replica == -1:
leader = self._client.cluster.leader_for_partition(tp)
# Only drop the cache if the leader itself told us there is no
# preferred replica. A `-1` from the previously chosen follower
# means "I am still the right choice" — keep it.
if responder_node_id == leader:
self._preferred_read_replica.pop(tp, None)
return
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe this whole block is not needed:

  • either we just sent the fetch request to the leader meaning that the cache was empty, or pointing to the leader, and basically the leader just tell us "keep going"
  • or we just sent the request to a replica, that return -1 to say "don't change the cache"

So maybe the whole function could just update the cache when

if preferred_read_replica is not None and preferred_read_replica >= 0:
    self._preferred_read_replica[tp] = (
        preferred_read_replica,
        time.monotonic() + self._preferred_replica_ttl,
    )

The cache invalidation is happening on error or when the timer is expiring, or when a broker indicate us to look in other place

Copy link
Copy Markdown
Contributor Author

@GlebShipilov GlebShipilov Apr 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, you are right. The two cases you outlined are exhaustive (the routing layer reads the same cache, so responder == leader always implies the cache is already empty or already pointing at the leader), and TTL + transport-failure eviction + the routing-error path already cover real invalidation.
I'll collapse the function to the update-only branch and update test_minus_one_from_leader_clears_cache to assert "no entry is created" instead of "entry is dropped". Pushing in the next commit.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@GlebShipilov Please also add this justification in a comment

…ead_replica

The leader-vs-follower invalidation branch is unreachable: when the
fetch was routed to the leader, the cache for that partition was
already empty by construction. Collapse to a single guarded write
and rely on TTL / transport-failure / routing-error paths for real
invalidation. Per @vmaurin's review.
@GlebShipilov
Copy link
Copy Markdown
Contributor Author

GlebShipilov commented Apr 22, 2026

I can see that there are some failed tests.

All 5 failing CI jobs are not caused by the rack-awareness changes in this PR.
They fall into two categories:

  1. Producer timeout flake (Python 3.11 / Kafka 2.8.1)
    test_producer_send_leader_notfound fails because its request_timeout_ms=200 is too tight. The broker doesn't finish auto-creating the topic within 200 ms on a busy GHA runner. The two accompanying teardown ERRORs (test_producer_warn_unclosed) are cascading resource warnings from the un-closed producer. A re-run would likely pass.
    I can bump request_timeout_ms in that test, or just leave it as-is, maybe re-run will help.
    Also, what I can do to fix it: call await self.wait_topic(producer.client, self.topic) before the mock, so metadata is already cached when send() runs. The 200ms timeout then only applies to the actual send (which is what the test intends to exercisem, the leader-not-found retry path), not the topic creation.

  2. Python 3.14 transactional consumer regression (all four 3.14 jobs)
    test_consumer_transactional_commit and test_consumer_transactional_abort fail with AssertionError: 0 != 3 on every Kafka version in the matrix (0.11.0.3, 1.1.1, 2.1.1, 2.2.2) but only on Python 3.14. The same tests pass on Python 3.11. The assertions at lines 74-75 and 133-134 expect last_stable_offset / highwater to be 3 after a transaction commit/abort, but get 0, suggesting the consumer's fetch loop doesn't complete an offset update in time under Python 3.14's changed asyncio scheduler.
    This looks like a pre-existing asyncio compat problem. I can increase the asyncio.sleep(1) in those tests, add a retry/poll loop for the offset assertions, or mark them xfail on 3.14, but I'd rather not mask it if you'd prefer a proper upstream investigation.

What do you think?

Comment thread aiokafka/consumer/fetcher.py Outdated
Comment thread tests/test_rack_awareness.py Outdated
Comment thread tests/test_rack_awareness.py Outdated
Comment thread tests/test_rack_awareness.py Outdated
Comment thread tests/test_rack_awareness.py Outdated
Comment thread tests/test_rack_awareness.py Outdated
Comment thread tests/test_rack_awareness.py Outdated
Annotate `preferred_read_replica` as `int` and drop the redundant
`is not None` guard. The value always comes from FetchResponse v11+
parsing, which yields an int (`-1` when absent), so `None` is no longer
reachable.
…r, trim per-fetch overhead

Align KIP-392 preferred-replica handling with the Java consumer and cut
work done on every fetch response when the feature is inactive.

* Invalidate cached preferred replica on any non-NoError code.
  Previously only NotLeaderForPartition / UnknownTopicOrPartition and
  OffsetOutOfRange evicted the cache, so any other partition-level error
  kept routing fetches to a follower the broker had implicitly rejected
  until the TTL expired. The per-error branches are now folded into a
  single `else:` that pops `_preferred_read_replica[tp]` first and only
  then dispatches the error-specific handling (metadata refresh, offset
  reset, auth warning, fallback log). Matches Java consumer behaviour.

* Compute `rack_aware` once per response.
  KIP-392 only applies when `client_rack` is set AND the broker speaks
  FetchRequest v11+. Previously every partition in every response paid
  for a `part_data[3]` lookup and an `_update_preferred_read_replica`
  call, even though a non-rack-aware setup is guaranteed to receive -1
  and never update the cache. The per-partition update is now gated by
  a single `rack_aware` boolean computed once per response.

* Make the v<11 fallback warning truly one-shot.
  The old `client_rack and API_VERSION < 11 and not _rack_warning_logged`
  triple-check ran on every fetch response for the lifetime of the
  consumer, even though the warning itself was emitted only once. The
  check now reuses `rack_aware`, short-circuits on `bool(self._client_rack)`
  for the common (unset) case, and a comment makes the latch explicit.
Comment thread docs/consumer.rst Outdated
tests/test_rack_awareness.py (review feedback):
- Remove unused `from __future__ import annotations`
- Remove `test_fetch_request_v9_does_not_carry_rack_id` (no requirement)
- Drop redundant `@pytest.mark.asyncio` (project uses auto mode)
- Remove unused `import pytest`
- Fix `_make_fetcher` annotation: `str` → `str | None`
- Use `mock.patch.object(asyncio, "sleep", ...)` instead of string path
- Replace `mock.patch("...create_task")` with async no-op routine

tests/test_transactional_consumer.py (Py3.14 flake fix):
Replace racy `asyncio.sleep(1)` + bare offset assertions with
`await consumer.getmany(timeout_ms=1000)` — a deterministic fetch
round-trip that guarantees the transaction control marker has been
processed and last_stable_offset/highwater are updated before asserting.
Fixes consistent `AssertionError: 0 != 3` on all Py3.14 CI jobs.

tests/test_producer.py (leader-not-found flake fix):
Warm up the topic with a separate default-timeout producer before
constructing the 200ms producer. Separates topic auto-creation from the
leader-not-found retry path the test exercises. Use `add_cleanup` to
prevent unclosed producer resource warnings on failure.
Comment thread tests/test_rack_awareness.py Fixed
@ods
Copy link
Copy Markdown
Collaborator

ods commented Apr 23, 2026

Thanks, @GlebShipilov — really high-quality work! It looks ready to merge to me. @vmaurin, your opinion?

@vmaurin
Copy link
Copy Markdown
Contributor

vmaurin commented Apr 23, 2026

Thanks, @GlebShipilov — really high-quality work! It looks ready to merge to me. @vmaurin, your opinion?

Yes, it looks good, thanks @GlebShipilov ! I haven't got a chance to test it, but I feel it could be merged to the main branch.

I think the most important thing before a release would be to check if it doesn't break for people not using the rack awareness feature

@ods
Copy link
Copy Markdown
Collaborator

ods commented Apr 23, 2026

I think the most important thing before a release would be to check if it doesn't break for people not using the rack awareness feature

We already did this for our setup. And I'm going to release beta version first, so that more people can do this.

@ods ods merged commit 57fa870 into aio-libs:master Apr 23, 2026
44 of 54 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants